In [1]:
#!/usr/bin/env python
"""
File : AvroSnappyIO.py
Author : Luca Menichetti <luca.menichetti AT cern dot ch>
Description: Converts a set of JSONs into Avro files with Snappy conversion,
Spark SQLContext is needed (or HiveContext)
"""
import json
class AvroSnappyIO(object):
def __init__(self, sparkContext, sparkSQLContext):
self.sqlc = sparkSQLContext
self.sc = sparkContext
def file_write(self, fname, data, repartitionNumber=None, write_mode="append"):
"""
fname: output folder name, usually a HDFS path
data: an array of JSONs
repartitionNumer: [optional] the number of partitions used to write the output file
"""
if not self.sqlc or not self.sc:
raise Exception("Both Spark Context and SQLContext must be available")
jsonDocsDF = self.sqlc.jsonRDD(self.sc.parallelize([json.dumps(j) for j in data]))
sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
if repartitionNumber:
jsonDocsDF.repartition(repartitionNumber).save(fname, "com.databricks.spark.avro", mode=write_mode)
else:
jsonDocsDF.save(fname, "com.databricks.spark.avro", mode=write_mode)
In [2]:
avro_snappy_IO = AvroSnappyIO(sc,sqlContext)
In [3]:
fwjr_array = [
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"}
]
In [5]:
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_array, 1)
In [6]:
%%bash
hadoop fs -ls test-json2avro-snappy
In [7]:
fwjr_another_array = [
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"},
{"PFNArrayRef":["inputPFNs","outputPFNs","pfn"],"task":"/AbcCde_Task_Data_test_2882516/RECO","skippedFiles":[1],"wmaid":"dd33065a5371dc8a2627d4ed4f38f87a","wmats":"1.45746097721583E9","fallbackFiles":[0],"LFNArray":["/store/data/Run2011A/Cosmics/RAW/v1/157/157/157/527326916439-527326916439-527326916439.root","/store/data/Run2011A/Cosmics/RAW/v1/459/459/459/938188751161-938188751161-938188751161.root","/store/data/Run2011A/Cosmics/RAW/v1/991/991/991/823442742311-823442742311-823442742311.root","/store/data/Run2011A/Cosmics/RAW/v1/524/524/524/472239962435-472239962435-472239962435.root"],"meta_data":{"agent_ver":"1.0.14.pre5","fwjr_id":"1-0","host":"test.fnal.gov","ts":1456500229},"PFNArray":["root://eoscms.cern.ch//eos/cms/store/data/Run2011A/Cosmics/RAW/v1/000/160/960/E8099605-8853-E011-A848-0030487A18F2.root","root://eoscms.cern.ch//eos/cms/store/unmerged/CMSSW_7_0_0_pre11/Cosmics/ALCARECO/DtCalib-RECOCOSD_TaskChain_Data_pile_up_test-v1/00000/ECCFE421-08CB-E511-9F4C-02163E017804.root"],"LFNArrayRef":["fallbackFiles","outputLFNs","lfn","skippedFiles","inputLFNs"],"stype":"mongodb"}
]
In [8]:
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_another_array, 1)
In [9]:
%%bash
hadoop fs -ls test-json2avro-snappy
In [10]:
import json
In [11]:
rec = json.load(open('/afs/cern.ch/user/l/lmeniche/work-ws-link/tmp/fwjr_prod.json'))
fwjr_array = [rec, rec]
avro_snappy_IO.file_write("test-json2avro-snappy",fwjr_array, 1)
In [13]:
avro_snappy_IO.file_write("file:///root/test-local-json2avro-snappy",fwjr_array, 1)
In [14]:
%%bash
ls /root/test-local-json2avro-snappy/
In [ ]: